package com.flink.test;

import com.google.gson.Gson;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.time.DateFormatUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @Description 向kafka发送测试模拟游戏数据
 * @Author JL
 * @Date 2020/10/10
 * @Version V1.0
 */
public class GameToKafkaMsg {
    //游戏列表
    static String [] upperNames = {"传奇霸主","凤武","神座","百战沙城","雷霆之怒","全民仙战","笑傲江湖","圣魔印"};
    //随机权重
    static Integer [] upperWeights = {7,4,4,3,2,2,1,1};
    //无，法师，战士，道士
    static String [] roleTypes = new String[]{"无", "法师", "战士", "道士"};
    //登录,退出,创角,升级
    static String [] eventTypes = new String[]{"login", "quit", "create", "upgrade"};

    /**
     * 订单信息：
     * 订单ID、用户名称、下单时间、总价
     * @param args
     */
    public static void main(String[] args) throws Exception {
        //生产者发送消息
        KafkaUtils.KafkaStreamServer kafkaStreamServer =  KafkaUtils.bulidServer().createKafkaStreamServer("192.168.110.35", 9092);
        String topic = "game_behavior";
        List<String> gameList = new ArrayList<>();
        for (int i = 0;i<upperNames.length; i++){
            for (int j = 0;j<upperWeights[i];j++){
                gameList.add(upperNames[i]);
            }
        }
        int i = 0;
        int size = gameList.size();
        //模拟不停电创建模拟订单
        while(true){
            long uid =  RandomUtils.nextLong(10000000L, 999999999L);
            int index1 = RandomUtils.nextInt(0, 4);
            int index2 = RandomUtils.nextInt(1, 4);
            String gameId = gameList.get(i);
            String serverId = "s" + RandomUtils.nextInt(1, 50);
            String event = eventTypes[index1];
            String role = roleTypes[0];
            //创角和升级随机分配角色
            if (event.equals("create") || event.equals("upgrade")){
                role = roleTypes[index2];
            }
            //模拟增加登录人数（每除5求余为0则多分配一次登录事件）
            if (i % 10 == 0){
                if (event.equals("create") || event.equals("upgrade")){
                    event = "login";
                }
            }
            i ++;
            if (i >= size){
                i = 0;
            }
            //生成时间
            long createTimeSeries = System.currentTimeMillis();
            String createTime = DateFormatUtils.format(createTimeSeries, "yyyy-MM-dd HH:mm:ss");
            //游戏ID，区服ID，用户ID，事件，角色，创建时间，时间戳
            GameInfo gameInfo = new GameInfo(gameId,serverId, uid, event, role, createTime, createTimeSeries);
            String gameJson = new Gson().toJson(gameInfo);
            //向kafka队列发送数据
            kafkaStreamServer.sendMsg(topic, gameJson);
            //模拟不同时间段的消费量
            int startInt = 100;
            //线程休眠
            TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(startInt, 2000));
        }
    }

    static class GameInfo{
        private String gameId;
        private String serverId;
        private Long uid;
        private String event;
        private String role;
        private String createTime;
        private Long createTimeSeries;

        public GameInfo(String gameId, String serverId, Long uid, String event, String role, String createTime, Long createTimeSeries) {
            this.gameId = gameId;
            this.serverId = serverId;
            this.uid = uid;
            this.event = event;
            this.role = role;
            this.createTime = createTime;
            this.createTimeSeries = createTimeSeries;
        }

        public String getGameId() {
            return gameId;
        }

        public void setGameId(String gameId) {
            this.gameId = gameId;
        }

        public String getServerId() {
            return serverId;
        }

        public void setServerId(String serverId) {
            this.serverId = serverId;
        }

        public Long getUid() {
            return uid;
        }

        public void setUid(Long uid) {
            this.uid = uid;
        }

        public String getEvent() {
            return event;
        }

        public void setEvent(String event) {
            this.event = event;
        }

        public String getRole() {
            return role;
        }

        public void setRole(String role) {
            this.role = role;
        }

        public String getCreateTime() {
            return createTime;
        }

        public void setCreateTime(String createTime) {
            this.createTime = createTime;
        }

        public Long getCreateTimeSeries() {
            return createTimeSeries;
        }

        public void setCreateTimeSeries(Long createTimeSeries) {
            this.createTimeSeries = createTimeSeries;
        }
    }

}
